-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Row
format backed by raw bytes
#1782
Conversation
Currently, the row format has not been hooked with the rest of the codebase. I'm not sure if it's appropriate to have its own PR or should it accompanied with a use case, such as in |
Cargo.toml
Outdated
|
||
[patch.crates-io] | ||
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" } | ||
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "731e132489b99cd688f884642cf20de52aed24d0" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relies on apache/arrow-rs@e375bba, will remove this once we have arrow 9.0.1 released.
datafusion/src/row/bitmap/mod.rs
Outdated
|
||
//! General utilities for null bit section handling | ||
//! | ||
//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this appears to itself be a copy of https://docs.rs/arrow/latest/arrow/util/bit_util/index.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bitmap is rewritten on top of arrow/util/bit_util
, along with a much-simplified version of fmt
.
Thank @yjshen -- I look forward to reviewing this carefully later today 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is extremely cool 👍
#[test] | ||
#[should_panic(expected = "supported(schema)")] | ||
fn test_unsupported_type_write() { | ||
let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW it would be really cool to support this as IOx uses it and it is just an Int64Array
with a different logical type, but I can always add later 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add this as a TODO item and will add this later.
datafusion/src/row/mod.rs
Outdated
fn type_width(dt: &DataType) -> usize { | ||
use DataType::*; | ||
if var_length(dt) { | ||
return 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably be std::mem::size_of<u64>
or ideally a varlena
offset type alias
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea! I temporarily use size_of::<u64>()
, and I can make it a type parameter for RowWriter
and RowReader
as we do for StringArray and LargeStringArray, for memory-saving purposes.
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Accessing row from raw bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a pretty picture would be very helpful, showing how data is encoded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is one for consieration:
Row Layout
┌────────────────┬──────────────────────────┬───────────────────────┐ ┌───────────────────────┐
│Validity Bitmask│ Fixed Width Field │ Variable Width Field │ ... │ vardata area │
│ (byte aligned) │ (native type width) │(len + vardata offset) │ │ (variable length) │
└────────────────┴──────────────────────────┴───────────────────────┘ └───────────────────────┘
For example, given the schema (Int8, Float32, Utf8, Utf8)
Encoding the tuple (1, NULL, "FooBar", "baz")
Requires 35 bytes as shown
┌────────┬────────┬──────────────┬──────────────────────┬──────────────────────┬───────────────────────┐
│0b000110│ 0x01 │ 0x00000000 │0x00000000 0x00000006│0x00000006 0x00000003│ FooBarbaz │
└────────┴────────┴──────────────┴──────────────────────┴──────────────────────┴───────────────────────┘
0 1 2 10 18 26 35
Validity Int8 Float32 Field Utf8 Field 1 Utf8 Field 2 Variable length
Mask Field (4 bytes) Offset: 0 Offset: 6 area
(1 byte) (1 byte) Size: 6 Size: 3 (9 bytes)
(8 bytes) (8 bytes)
Also attaching the monopic file in case anyone finds that useful:
drawing.zip
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've adapted the fig a little bit and put it on the module doc. Many thanks, @alamb!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?
I don't mind at all -- that is why I made it :)
get_idx!(i32, self, idx, 4) | ||
} | ||
|
||
fn get_date64(&self, idx: usize) -> i64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps these methods should assert that idx
is actually the corresponding type. Otherwise this is effectively reinterpreting memory as different primitives, which whilst technically safe, is borderline unsafe 😀
datafusion/src/row/writer.rs
Outdated
} | ||
|
||
fn set_offset_size(&mut self, idx: usize, size: usize) { | ||
let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably panic if size
or verlana_offset
are too large, on a related note - perhaps they should be u32
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started going through the code @yjshen and it is very neat. Thank you!
I think it would help me to see some example of how we intend to use this to hone the interface.
The main usecases explained on #1708 are sort
and grouping
and I think would be used slightly differently.
For example, for sorting I imagine code that may look like
// get NULLS FIRST / LAST / ASC / DESC for each column
let sort_opts: Vec<&SortOptions> = sort_exprs.iter().map(|s| &s.opts).collect();
let sort_expr_batches: RecordBatch = sort_exprs.eval(input_batch)?;
// build "sort keys" that will order in the same way as the original row values
let sort_key: Rows = RowWriter::new()
.for_sorting(sort_expr_batches, sort_opts)
.build();
And then sorting (or merging) could proceed by sorting the appropriate part of the [u8]
using memcmp
d (no type dispatch needed) and take
kernels can be used to form the final arrays
There is no need to reform the Array's from the sort key
grouping is a little different as the group key is part of the final output and only equality / non equality is important (not ordering). In that case perhaps having a mut Rows
would help:
let mut output_group_keys: Rows = ...;
// for each batch
// compute group key exprs
let group_expr_batch: RecordBatch = group_exprs.eval(input_batch)?;
let group_keys: Rows = RowWriter::new()
.for_grouping(group_expr_batch)
.build();
for key in group_keys {
let accum_index = !if hash_table.contains(key) {
output_group_keys.push(key); // copy bytes
hash_table.add(key, output_group_keys.len()); // remember what index this key is
} else {
hash_table.get(key)
}
// somehow get the accumulator state (ideally could also be in the output_group_keys)
// and update
}
datafusion/src/row/writer.rs
Outdated
} | ||
|
||
/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width | ||
fn write_row( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having a vectorized version of this would also be helpful so an entire record batch could be converted at once
Thanks a lot for the detailed review @tustvold I made several changes, including bitmap rewrite, size rounding, some docs, and made the rest of the great suggestions as TODOs in the PR desc, in case it slipped away silently. |
Thanks @alamb for the write-up of row use cases. [Use directly] The current row implementation is mostly targeted at payload use cases, that we do not update or check by index after the first write, and only decompose to record batch at last. This is the case for sort payload, hash aggregate composed grouping key (we can directly compare raw bytes for equality), hash join key, and join payload. [Minor adapt] We should change it a little bit by adhering to word-aligned initializing and updating for aggregation state (for CPU friendly), much as you suggested:
[Minor adapt] For composite sort key with no varlena, we shall remove the null-bits part, padding null attributes bytes as all 0xFF or all 0x00 (according to null first or null last sort option), and do raw bytes comparison. [NOT FIT] For composite sort key, if var length attributes (varlena) exist and not the last, direct comparison of raw bytes of the current row format doesn't fit. We need to store varlena in place, padding all sorting keys to the longest width, on which we could compare directly using raw bytes. |
I am not sure about sorting payloads. It seems to me like copying around the sort payload will potentially be quite ineffecient Consider a table like CREATE TABLE sales (
value float,
first_name varchar(2000),
last_name varchar(2000),
address varchar(2000)
) And a query like SELECT * from sales order by value; In this case only value needs to be compared, and the payload may be substantial I thought the current state of the art was to do something like
This copies the payload columns only once If you instead use the Row to hold the payload, you end up
Which results in copying the payloads twice - and for large tables this is a substantial overhead. However, I agree a format like this be helpful for storing hash aggregate composed grouping keys, join keys (and maybe intermediate aggregates) I'll give this PR a good look tomorrow morning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a great start @yjshen -- thank you.
I do have some concerns about this data structure (especially with respect to storing multiple rows using variable length fields) but I think we can iterate on it in follow on PRs as we try and use this structure in DataFusion.
In terms of next steps, how about we pick one operation (such as either Sort or GroupHash) to migrate to use this Row implementation with? I bet we will end up refining the interface
//! The null bit set is used for null tracking and is aligned to 1-byte. It stores | ||
//! one bit per field. | ||
//! | ||
//! In the region of the values, we store the fields in the order they are defined in the schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This describes a variable length tuple -- so if you pack a bunch of Row
s together in memory it will not be possible to find some arbitrary location quickly
For example, finding where Row3
starts in the following picture needs to scan all columns of Row1 and Row2
┌─────────────────────────────────┐
│ Row1 │
├──────────┬──────────────────────┤
│ Row2 │ Row3 │
├──────────┴─┬────────────────────┤
│ Row4 │ Row5 │
└────────────┴────────────────────┘
The benefit of this strategy is that tuple construction will be very fast and memory usage optimal
There are other strategies that have a fixed width tuples, as discussed on #1708 that have benefits (though are likely not as memory optimal)
I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:
┌───────────┬──┐ ┌─────────────┐
│ Row1 │ ├──────┐ │ │
├───────────┼──┤ │ ┌──────┼▶ │
│ Row2 │ │──────┼┐├──────┼─▶ │
├───────────┼──┤ │││ │ │
│ Row3 │ │──────┼┼┤ │ variable │
├───────────┼──┤ └┼┼─────▶│ length area │
│ Row4 │ │───────┼┘ │ │
├───────────┼──┤ └──────▶│ │
│ Row5 │ │───────────┐ │ │
└───────────┴──┘ └───┼───────▶ │
└─────────────┘
Maybe I can find some time this weekend to play around with some ideas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One classic database technique to pack such data into a single block (rather than a separate variable length area) is to preallocate the page (e.g. 32K or something) and then write rows into the front of the page, but filling the variable length area from the back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finding where Row3 starts in the following picture needs to scan all columns of Row1 and Row2
I think since we are doing in-memory processing, we can actually store each row's starting offset as a separate vector. just like the method shows:
pub fn write_batch_unchecked(
output: &mut [u8],
offset: usize,
batch: &RecordBatch,
row_idx: usize,
schema: Arc<Schema>,
) -> Vec<usize> {
let mut writer = RowWriter::new(&schema);
let mut current_offset = offset;
let mut offsets = vec![];
for cur_row in row_idx..batch.num_rows() {
offsets.push(current_offset);
let row_width = write_row(&mut writer, cur_row, batch);
output[current_offset..current_offset + row_width]
.copy_from_slice(writer.get_row());
current_offset += row_width;
writer.reset()
}
offsets
}
I'm thinking of just keeping the offset vector we got while writing, and using it hereafter.
I think getting optimal performance will come from being able to vectorize many operations for which fixed sized tuples are compelling:
I'm afraid it will be hard to vectorize the operation on a row format since row width may easily exceed the SIMD lane?
filling the variable length area from the back.
Yes, I'm aware of the strategy. but we are using rows mainly during execution, unlike the DBMS systems using this to keep tuples in long-term stores, I think we can just store offset separately in a vector?
//! In the region of the values, we store the fields in the order they are defined in the schema. | ||
//! - For fixed-length, sequential access fields, we store them directly. | ||
//! E.g., 4 bytes for int and 1 byte for bool. | ||
//! - For fixed-length, update often fields, we store one 8-byte word per field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really see a notion of "update often" appearing in this code. Maybe it is future work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's not implemented yet. As discussed in [Minor adapt]
for aggregation state.
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Accessing row from raw bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is one for consieration:
Row Layout
┌────────────────┬──────────────────────────┬───────────────────────┐ ┌───────────────────────┐
│Validity Bitmask│ Fixed Width Field │ Variable Width Field │ ... │ vardata area │
│ (byte aligned) │ (native type width) │(len + vardata offset) │ │ (variable length) │
└────────────────┴──────────────────────────┴───────────────────────┘ └───────────────────────┘
For example, given the schema (Int8, Float32, Utf8, Utf8)
Encoding the tuple (1, NULL, "FooBar", "baz")
Requires 35 bytes as shown
┌────────┬────────┬──────────────┬──────────────────────┬──────────────────────┬───────────────────────┐
│0b000110│ 0x01 │ 0x00000000 │0x00000000 0x00000006│0x00000006 0x00000003│ FooBarbaz │
└────────┴────────┴──────────────┴──────────────────────┴──────────────────────┴───────────────────────┘
0 1 2 10 18 26 35
Validity Int8 Float32 Field Utf8 Field 1 Utf8 Field 2 Variable length
Mask Field (4 bytes) Offset: 0 Offset: 6 area
(1 byte) (1 byte) Size: 6 Size: 3 (9 bytes)
(8 bytes) (8 bytes)
Also attaching the monopic file in case anyone finds that useful:
drawing.zip
@yjshen let me know if you want to make any changes to this PR otherwise I'll merge it in and we can iterate from there |
Thanks @alamb for all the thoughts!
I've listed several TODOs in the PR description, will do as follow-ups. I think the PR is ready to be merged now. |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Accessing row from raw bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diagram is excellent! I put a to-do item yesterday but haven't got a chance to finish it yet. Do you mind I put this into the doc?
I don't mind at all -- that is why I made it :)
Thanks @yjshen . I didn't spare time to see this wonderful ticket, will enjoy it over the weekend. |
Which issue does this PR close?
Closes #1708 .
What changes are included in this PR?
A row format backed by raw bytes:
Each tuple consists of up to three parts: [null bit set] [values] [var length data]
The null bit set is used for null tracking and is aligned to 1-byte. It stores one bit per field.
In the region of the values, we store the fields in the order they are defined in the schema.
Are there any user-facing changes?
No.
Move todo items to #1861 for tracking.